package com.xwj.controller;

import com.xwj.producer.KafkaProducer;
import com.xwj.util.NumUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author xwj
 * @date 2020/4/14
 */
@RequestMapping("/kafka")
@RestController
public class KafkaController {
    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/send/{msg}")
    public void send(@PathVariable(name = "msg") String msg) {
        kafkaProducer.send("xwj-topic-1", msg);
        kafkaProducer.send("xwj-topic-2", msg);
    }

    /**
     * 消息顺序消费
     * <p>
     * 我们在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序，比如我们同时发了 2 个消息，
     * 这 2 个消息对应的操作分别对应的数据库操作是：更改用户会员等级、根据会员等级计算订单价格。
     * 假如这两条消息的消费顺序不一样造成的最终结果就会截然不同。
     * 我们知道 Kafka 中 Partition(分区)是真正保存消息的地方，我们发送的消息都被放在了这里。
     * 而我们的 Partition(分区) 又存在于 Topic(主题) 这个概念中，并且我们可以给特定 Topic 指定多个 Partition。
     * <p>
     * 每次添加消息到 Partition(分区) 的时候都会采用尾加法，如上图所示。
     * Kafka 只能为我们保证 Partition(分区) 中的消息有序，而不能保证 Topic(主题) 中的 Partition(分区) 的有序。
     * 消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量（offset）。
     * Kafka 通过偏移量（offset）来保证消息在分区内的顺序性。
     * 所以，我们就有一种很简单的保证消息消费顺序的方法：1 个 Topic 只对应一个 Partition。
     * 这样当然可以解决问题，但是破坏了 Kafka 的设计初衷。
     * <p>
     * Kafka 中发送 1 条消息的时候，可以指定 topic, partition, key,data（数据） 4 个参数。
     * 如果你发送消息的时候指定了 Partition 的话，所有消息都会被发送到指定的 Partition。
     * 并且，同一个 key 的消息可以保证只发送到同一个 partition，这个我们可以采用表/对象的 id 来作为 key 。
     * 总结一下，对于如何保证 Kafka 中消息消费的顺序，有了下面两种方法：
     * <p>
     * 1 个 Topic 只对应一个 Partition。
     * （推荐）发送消息的时候指定 key/Partition。
     * 当然不仅仅只有上面两种方法，上面两种方法是我觉得比较好理解的，
     * <p>
     * <p>
     * 接着，我们在消费者里可能会搞多个线程来并发处理消息。因为如果消费者是单线程消费处理，而处理比较耗时的话，比如处理一条消息耗时几十 ms，
     * 那么 1 秒钟只能处理几十条消息，这吞吐量太低了。而多个线程并发跑的话，顺序可能就乱掉了
     * 1：一个 topic，一个 partition，一个 consumer，内部单线程消费，单线程吞吐量太低，一般不会用这个。
     * 2：写 N 个内存 queue，具有相同 key 的数据都到同一个内存 queue；然后对于 N 个线程，每个线程分别消费一个内存 queue 即可，这样就能保证顺序性。
     *
     * @param msg
     */
    @GetMapping("/sequence/send/{msg}")
    public void sequenceConsumer(@PathVariable(name = "msg") String msg) {
        if (NumUtil.isOdd(Integer.valueOf(msg))) {
            kafkaProducer.send("xwj-topic-1", "odd-key", msg);
        } else {
            kafkaProducer.send("xwj-topic-1", "even-key", msg);
        }
    }
}
